Kafka
作为一个消息中间件,在某些情况下可能会丢失消息,如果在一些重要的业务场景中,消息的丢失可能会带来重大的问题,因此我们简单分析下丢消息的场景以及如何避免:
生产者丢消息
对于生产者来说,一般丢消息指的是生产者认为消息已经发送,但是其实由于网络不通,网络抖动等消息并没有发送成功,这样的场景便是消息的丢失。对于生产者来说,出现这样的场景一般都在于调用了同步的send()
方法而没有去管返回的结果。
明白了原因,解决方案便很简单,那就是对于每个同步发送方法,都需要获取其发送结果,同时对结果进行判断,增加失败重试的机制。
Broker 丢消息
Broker
端丢消息的原因主要在于Broker
如果突然Down
机,而导致的丢消息。Kafka
的Broker
端通过消息的冗余实现了高可用。而这种高可用是依赖于主从同步实现的,因此如果当一条消息发送到Broker
之后,Leader
突然挂掉,由于消息还没有来得及同步到follower
端,因此这条消息便会丢失。
解决方案便是再给生产者返回发送成功的消息之前,需要确定消息至少同步到了一个follower
中再返回,也就是配置acks=1
,当然,如果业务场景非常重要,则可以配置acks=all
,但是牺牲的是性能
消费者丢消息
消费者丢消息的情况比较多,且比较复杂,因此这里先简单介绍下消费者的消费方式。
默认情况下,消费消息的步骤如下:
- 消费者主动调用
poll()
方法拉取消息 - 消费者拉取消息后,主动上报目前的消费进度到
Broker
端 Broker
端收到消费进度后,将该分区的消费进度持久化
一般来说,消费者所调用的poll()
方法默认情况下是自动提交,通过参数enable.auto.commit
配置,并且并不是每poll()
一次就提交一次,而是每个时间段统一提交,通过参数auto.commit.interval.ms
控制。
这里带来一个问题,如果消息不是每次
poll()
都提交,那么第一次poll()
和第二次poll()
不会拉到相同的数据么?因为offset
并没有提交。但是实际并不会这样,因为提交
offset
一是为了再均衡后将消费位移共享给其他客户端,二是为了消费者重启后,能获取上一次的消费进度。而平时,消费者都会在内存中保留了每次
poll()
后的位移,因此每次调用poll()
返回后,都会更新此位移。
**自动提交带来的问题: **
自动提交虽然方便,但是由于消费者客户端并不知道客户端的业务逻辑是否真正处理完成,因而可能会出现以下情况:
poll()
拉取到100条消息,客户端同时进行逻辑处理,然后5s后客户端才处理完50条消息,此时由于到达了应该提交的时间,因此消费者客户端上报目前消费offset
为100,上报完成后,正好客户端崩溃,当客户端重启后,从Broker
获取到的同步位移却是100,此时便出现了消息的丢失。
自动提交主要的矛盾是并不知道消息是否真正的在业务逻辑上消费完成便提交了进度,从而导致了消息的丢失。解决的办法比较简单,那就是修改为手动提交。
手动提交带来的问题:
想要手动提交数据,需要先将参数enable.auto.commit
设置为false
,然后使用如下方式手动提交:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//do some logical processing.
}
consumer.commitSync();
}
关于手动提交,Kafka
提供了两种方式,同步提交和异步提交,需要注意的是提交位移也有可能会失败。如果失败了则需要进行重试,但是重试需要需要位移覆盖的问题,解决位移覆盖的问题的方法便是使用全局递增位移记录,当需要重试的时候对比当前重试的位移和已提交的位移的大小,如果比已提交的位移小,则不用再进行提交。
Kafka
消费者客户端提供了几种重载的位移提交方法:
commitSync(); //无参数手动提交,默认提交当前最大的消费进度
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) //指定分区,指定位移提交
commitAsync(); //无参异步提交,同commitSync
commitAsync(OffsetCommitCallback callback); //通过回调告知返回结果的异步提交
commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback); //通过回调告知返回结果的指定分区,指定位移提交
注意,对于手动提交,需要提交的
offset
不是最后一次获取的消息位移,而是下一次消费需要拉取的位置,也就是lastConsumedOffset +1
在
Kafka
中,对于位移的概念基本都是xxx+1,比如HW
,LEO
等
手动提交虽然解决了消息丢失的问题,但是会带来重复消费的问题,因为消息位移依然是批量手动提交的,因此如果有些消息实际上已经经过业务逻辑处理,但是由于位移还没有提交,此时如果消费者客户端崩溃,则会使得之前已经处理的消息会再次被处理。
手动提交带来的问题2:
前面说了,可以通过手动提交位移,但是如果这个客户端拉取的消息被多线程消费,那么多线程的手动提交依然会有问题。
典型的模型如下:
Consumer
会作为一个生产者不断的拉取消息,然后线程池中的线程不断地消费消息。
此时就会有一个问题便是如果线程A消费的是1-10的消息,而线程B消费的是10-20消息,那么如果线程B比线程A先消费完,此时线程B应该怎么提交?
此时便可以通过一个滑动窗口的模型来解决这个问题:
主要思想便是将乱序改为有序。但是这样会造成其他线程的一定的等待时间。
再均衡带来的消息重复消费
Kafka
提供了消费者组的概念,然而对于消费者组来说,消费者客户端的数量是会动态变化的,而每个消费者则是对应着每个分区,同时在再均衡过程中,整个消费者组会变得不可用。
在再均衡发生的时候,如果某个分区拉取了并处理了消息,但是还没有来得及提交,此时消费者客户端会丢失这些状态,并变得不可用,当再均衡完成后,又会重新拉取消息进行消费,也就发生了消息重复消费的情况。
Kafka
专门提供了再均衡监听器接口(ConsumerRebalanceListener
),用来解决这个问题。再均衡监听器主要在调用subscribe()
方法的时候设置。其主要包含两个方法:
onPartitionsRevoked(Collection<TopicPartition> partitions); //在再均衡开始之前和消费者停止读取消息之后被调用,可以通过这个回调来处理消费位移的提交,partitions 表示再均衡之前分配到的分区
opPartitionsAssigend(Collection<TopicPartition> partitions): //在重新分配分区之后和消费者开始读取消费之前被调用,partitions 表示再均衡之后分配到的分区